package org.codehaus.activemq.service.impl;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.filter.AndFilter;
import org.codehaus.activemq.filter.Filter;
import org.codehaus.activemq.filter.FilterFactory;
import org.codehaus.activemq.filter.FilterFactoryImpl;
import org.codehaus.activemq.filter.NoLocalFilter;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.service.Dispatcher;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.Subscription;
import org.codehaus.activemq.service.SubscriptionContainer;
import org.codehaus.activemq.service.TopicMessageContainer;
import org.codehaus.activemq.store.PersistenceAdapter;

public class DurableTopicMessageContainerManager extends MessageContainerManagerSupport
{
  private PersistenceAdapter persistenceAdapter;
  protected SubscriptionContainer subscriptionContainer;
  protected FilterFactory filterFactory;
  protected Map activeSubscriptions = new ConcurrentHashMap();
  private boolean loadedMessageContainers;

  public DurableTopicMessageContainerManager(PersistenceAdapter persistenceAdapter)
  {
    this(persistenceAdapter, new DurableTopicSubscriptionContainerImpl(), new FilterFactoryImpl(), new DispatcherImpl());
  }

  public DurableTopicMessageContainerManager(PersistenceAdapter persistenceAdapter, SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) {
    super(dispatcher);
    this.persistenceAdapter = persistenceAdapter;
    this.subscriptionContainer = subscriptionContainer;
    this.filterFactory = filterFactory;
  }

  public void addMessageConsumer(BrokerClient client, ConsumerInfo info)
    throws JMSException
  {
    if (info.isDurableTopic())
      doAddMessageConsumer(client, info);
  }

  public void removeMessageConsumer(BrokerClient client, ConsumerInfo info)
    throws JMSException
  {
    this.subscriptionContainer.removeSubscription(info.getConsumerId());
    Subscription sub = (Subscription)this.activeSubscriptions.remove(info.getConsumerId());
    if (sub != null) {
      sub.setActive(false);
      this.dispatcher.removeActiveSubscription(client, sub);
    }
  }

  public void deleteSubscription(String clientId, String subscriberName)
    throws JMSException
  {
    boolean subscriptionFound = false;
    for (Iterator i = this.subscriptionContainer.subscriptionIterator(); i.hasNext(); ) {
      Subscription sub = (Subscription)i.next();
      if ((sub.getClientId().equals(clientId)) && (sub.getSubscriberName().equals(subscriberName)))
      {
        if (sub.isActive()) {
          throw new JMSException("The Consummer " + subscriberName + " is still active");
        }

        this.subscriptionContainer.removeSubscription(sub.getConsumerId());
        sub.clear();
        subscriptionFound = true;
      }
    }

    if (!subscriptionFound)
      throw new IllegalStateException("The Consumer " + subscriberName + " does not exist for client: " + clientId);
  }

  public void sendMessage(BrokerClient client, ActiveMQMessage message)
    throws JMSException
  {
    ActiveMQDestination dest = (ActiveMQDestination)message.getJMSDestination();
    MessageContainer container;
    Iterator i;
    if ((dest != null) && (dest.isTopic()) && (message.getJMSDeliveryMode() == 2)) {
      container = getContainer(message.getJMSDestination().toString());

      container.addMessage(message);
      for (i = this.subscriptionContainer.subscriptionIterator(); i.hasNext(); ) {
        Subscription sub = (Subscription)i.next();
        if (sub.isTarget(message))
          sub.addMessage(container, message);
      }
    }
  }

  public void acknowledgeMessage(BrokerClient client, MessageAck ack)
    throws JMSException
  {
    Subscription sub = (Subscription)this.activeSubscriptions.get(ack.getConsumerId());
    if (sub != null)
      sub.messageConsumed(ack);
  }

  public void acknowledgeTransactedMessage(BrokerClient client, String transactionId, MessageAck ack) throws JMSException
  {
    Subscription sub = (Subscription)this.activeSubscriptions.get(ack.getConsumerId());
    if (sub != null)
      sub.onAcknowledgeTransactedMessageBeforeCommit(ack);
  }

  public void redeliverMessage(BrokerClient client, MessageAck ack) throws JMSException
  {
    Subscription sub = (Subscription)this.activeSubscriptions.get(ack.getConsumerId());
    Iterator iter;
    if (sub != null)
    {
      for (iter = this.messageContainers.values().iterator(); iter.hasNext(); ) {
        MessageContainer container = (MessageContainer)iter.next();
        if (container.containsMessage(ack.getMessageIdentity())) {
          sub.redeliverMessage(container, ack);

          break;
        }
      }
    }
  }

  public void poll()
    throws JMSException
  {
  }

  public void commitTransaction(BrokerClient client, String transactionId)
  {
  }

  public void rollbackTransaction(BrokerClient client, String transactionId)
  {
  }

  public MessageContainer getContainer(String destinationName)
    throws JMSException
  {
    TopicMessageContainer container = (TopicMessageContainer)this.messageContainers.get(destinationName);
    if (container == null) {
      container = this.persistenceAdapter.createTopicMessageContainer(destinationName);
      container.start();
      this.messageContainers.put(destinationName, container);
    }
    return container;
  }

  protected void doAddMessageConsumer(BrokerClient client, ConsumerInfo info)
    throws JMSException
  {
    boolean shouldRecover = false;
    if ((info.getConsumerName() != null) && (info.getClientId() != null)) {
      this.subscriptionContainer.checkForDuplicateDurableSubscription(client, info);
    }
    Subscription subscription = this.subscriptionContainer.getSubscription(info.getConsumerId());
    if ((subscription != null) && (subscription.isDurableTopic()))
    {
      if ((!subscription.getDestination().equals(subscription.getDestination())) || (!subscription.getSelector().equals(info.getSelector()))) {
        this.subscriptionContainer.removeSubscription(info.getConsumerId());
        subscription.clear();
        subscription = this.subscriptionContainer.makeSubscription(this.dispatcher, info, createFilter(info));
      }
    }
    else {
      subscription = this.subscriptionContainer.makeSubscription(this.dispatcher, info, createFilter(info));
      shouldRecover = true;
    }
    subscription.setActiveConsumer(info);
    this.activeSubscriptions.put(info.getConsumerId(), subscription);

    this.dispatcher.addActiveSubscription(client, subscription);

    if (shouldRecover) {
      recoverSubscriptions(subscription);
    }

    subscription.setActive(true);
  }

  protected void recoverSubscriptions(Subscription subscription)
    throws JMSException
  {
    if (subscription.isWildcard()) {
      synchronized (this) {
        if (!this.loadedMessageContainers) {
          loadAllMessageContainers();
          this.loadedMessageContainers = true;
        }
      }

    }

    getContainer(subscription.getDestination().getPhysicalName());

    for (Iterator iter = this.messageContainers.values().iterator(); iter.hasNext(); ) {
      TopicMessageContainer container = (TopicMessageContainer)iter.next();
      container.recoverSubscription(subscription);
    }
  }

  protected void loadAllMessageContainers()
  {
  }

  protected Filter createFilter(ConsumerInfo info)
    throws JMSException
  {
    Filter filter = this.filterFactory.createFilter(info.getDestination(), info.getSelector());
    if (info.isNoLocal()) {
      filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
    }
    return filter;
  }
}